Skip to content

Graph-based workflow orchestration engine#1

Merged
DonPrus merged 55 commits intomainfrom
feat/orchestration
Mar 14, 2026
Merged

Graph-based workflow orchestration engine#1
DonPrus merged 55 commits intomainfrom
feat/orchestration

Conversation

@DonPrus
Copy link
Contributor

@DonPrus DonPrus commented Mar 14, 2026

Summary

Complete orchestration engine with graph-based workflow execution and unified state model.

Core Engine

  • 7 node types: task, route, interrupt, agent, send, transform, subgraph
  • Graph execution: workflow defined as {nodes, edges, state_schema} with __start__/__end__ synthetic nodes
  • Conditional routing: route nodes evaluate state and select edges by match
  • Parallel fan-out: send nodes dispatch to multiple targets simultaneously
  • Subgraph execution: inline child workflow with input/output mapping (max depth 10)
  • Multi-turn agents: continuation prompts with configurable max_turns via A2A protocol
  • Command goto: workers override normal graph traversal by returning {"goto": "node_name"}
  • Deferred nodes: execute right before __end__ regardless of graph position
  • Breakpoints: interrupt_before/interrupt_after arrays pause execution for human review

Unified State Model

  • 7 reducer types: last_value, append, merge, add, min, max, add_messages
  • add_messages: ID-based merge, replace, and remove for chat history
  • Overwrite bypass: {"__overwrite": true, "value": ...} skips reducer
  • Ephemeral keys: auto-clear after each step
  • Managed values: __meta injected before each node (step count, run_id, node_name, remaining_steps)

Checkpoints & Time Travel

  • State snapshot after every node execution
  • Fork: create new run from any checkpoint with modified state
  • Replay: re-execute from checkpoint within same run
  • Resume: continue interrupted run with optional state updates

Streaming

  • 5 SSE modes: values (full state), updates (partial), tasks (lifecycle), debug (everything), custom (user-defined)
  • Per-run event queues with thread-safe distribution

Worker Dispatch

  • 6 protocols: webhook, api_chat, openai_chat, a2a, mqtt, redis_stream
  • Tag-based worker selection with capacity tracking
  • A2A preference for agent nodes (JSON-RPC 2.0 tasks/send)
  • Per-node retry with configurable max_attempts
  • Node-level cache (FNV hash key + TTL)
  • Rate limit tracking per worker

Operations

  • Workflow CRUD: create, list, get, update, delete workflow definitions
  • Hot reload: WorkflowWatcher polls workflows/ directory for JSON changes
  • Validation: reachability, cycle detection, state key references, route/send target checks
  • Mermaid export: generate diagram from workflow definition
  • Token accounting: cumulative input/output tokens per step and per run
  • Prometheus metrics: counters for runs, steps, errors
  • Configurable runs: workflow defaults + per-run overrides via config_json

Pull-Mode (NullTickets Integration)

  • Tracker thread polls NullTickets for tasks
  • Lease-based claiming with heartbeat
  • Workspace lifecycle hooks (after_create, before_run, after_run, before_remove)
  • Reconciliation: cancel run if upstream task reaches terminal state
  • Stall detection with configurable timeout

Infrastructure

  • MIT license
  • CI: tests on Ubuntu, macOS, Windows
  • Release builds for 7 targets (linux x86_64/aarch64/riscv64, macos x86_64/aarch64, windows x86_64/aarch64)
  • Docker images (linux/amd64, linux/arm64)
  • Branch protection on main (CI checks + PR review required)

DonPrus added 30 commits March 13, 2026 18:06
Add CRUD methods to Store for all new tables from migration 004:
- Workflow methods: create, get, list, update, delete
- Checkpoint methods: create, get, list, getLatest
- Agent event methods: create, listByRunStep
- Run state methods: updateRunState, incrementCheckpointCount,
  createRunWithState, createForkedRun
- Pending injection methods: create, consume, discard

Also adds PendingInjectionRow to types.zig and fixes migration 004
to preserve step_deps/cycle_state/saga_state tables until the engine
rewrite (Task 8) removes their usage.

Includes comprehensive tests for all new methods.
Replace the old 14-step-type engine with a graph-based state model
using 6 node types (task, route, interrupt, agent, send, transform).
The engine now processes workflows as a DAG with edges, applying
state updates through reducers and saving checkpoints after each node.

Key changes:
- processRun loops finding ready nodes until no more progress
- findReadyNodes with dead-node detection for conditional routing
- State flows through applyUpdates/reducers instead of templates
- Route nodes use conditional edges (source:value) for branching
- Interrupt nodes pause the run with checkpoint preservation
- Transform nodes apply static updates without worker dispatch
- Send nodes dispatch target_node per item from state array

Also adds state_json to RunRow and updates store queries to include it.
Add workflow CRUD (create/list/get/update/delete/validate/run),
checkpoint endpoints (list/get), state control (resume/fork/inject),
SSE stream snapshot, and agent events callback. Update handleGetRun
with state_json and checkpoint_count. Add SSE hub cleanup to cancel.
Remove old signal endpoint (replaced by state inject).
- Add AgentOpts struct and dispatchStepWithOpts() in dispatch.zig so
  webhook bodies for agent steps can include mode, callback_url,
  max_iterations, tools, and state fields
- Wire SseHub into main.zig: create instance on startup and set
  ctx.sse_hub for every request handler
- Add TODO in tracker.zig pollAndClaim noting workflow format update
  needed when nulltickets schema changes land (Task 14)
Add native support for nullclaw's Agent-to-Agent protocol (JSON-RPC 2.0
over /a2a endpoint) as a new worker protocol. Agent nodes now prefer
A2A-protocol workers for dispatch, falling back to other protocols when
no A2A worker is available.

- Add 'a2a' variant to worker_protocol.Protocol enum with URL builder
  that appends /a2a to the worker base URL
- Add buildA2aRequestBody() producing tasks/send JSON-RPC requests with
  contextId for session persistence
- Add parseA2aResponse() extracting text from result.artifacts with
  proper error handling for JSON-RPC errors and failed task status
- Route A2A responses through dedicated parser instead of generic
  worker_response.parse
- Engine's executeTaskNode prefers A2A workers for agent-type nodes
- All existing webhook/api_chat/openai_chat paths remain unchanged
…, managed values

Gap 2: Per-node retry with exponential backoff (max_attempts, initial_interval_ms, backoff_factor, max_interval_ms)
Gap 3: Per-node cache with TTL (node_cache table, FNV hash cache keys, skip dispatch on hit)
Gap 4: Pending writes table for parallel execution resilience
Gap 5: Overwrite bypass (__overwrite: true) skips reducer in applyUpdates
Gap 6: Deferred nodes (defer: true) execute just before __end__
Gap 7: Managed values (__meta with step, is_last_step, remaining_steps, run_id, node_name)
Gap 8: Extend SseEvent with StreamMode enum (values, updates, tasks,
debug, custom). Engine broadcasts multi-mode events after each node
execution. Stream endpoint accepts ?mode= query param to filter.

Gap 9: Workflow version tracking in checkpoint metadata. On resume,
detect version mismatch and migrate completed_nodes by filtering
out nodes that no longer exist in the new workflow definition.
Store functions support versioned workflow CRUD.

Gap 10: POST /runs/{id}/replay endpoint resets run state to a
checkpoint and marks it as running for the engine to pick up.
Validates checkpoint belongs to the target run.
Gap 1: WorkflowWatcher in workflow_loader.zig periodically scans
workflows_dir for changed JSON files (FNV1a hash comparison) and
upserts into the workflows table. Wired into engine tick loop and
main.zig startup.

Gap 2: Token accounting columns on runs/steps (migration 004).
Store methods updateStepTokens/updateRunTokens accumulate usage.
Engine extracts usage from worker responses and records per-step
and per-run totals. GET /runs/{id} includes token fields.

Gap 3: In-memory rate_limits map on Engine, populated from worker
response rate_limit objects. GET /rate-limits endpoint returns
current rate limit info for all workers.

Gap 4: Agent multi-turn loop checks turn_timeout_ms from node
config. If elapsed time exceeds the timeout, the loop stops and
uses the last successful response.
Gap 5: Add startupCleanup() to Tracker for workspace cleanup on start.
Gap 6: Add validateWorkspacePath() and sanitizeDirectoryName for symlink
       escape prevention; validate paths before workspace operations.
Gap 7: Add OrchestratorEvent struct with typed events; emit structured
       events at run/step lifecycle points and broadcast via SseHub.
Gap 8: Add validateConfig() check per engine tick; skip dispatch when
       no workers registered or store is unhealthy.
Remove backward-compatibility cruft from the pre-orchestration architecture:

Dead types removed:
- ChatMessageRow, SagaStateRow from types.zig

Dead store methods removed:
- getCycleState, upsertCycleState (cycle_state table)
- insertChatMessage, getChatMessages (chat_messages table)
- insertSagaState, updateSagaState, getSagaStates (saga_state table)
- getReadySteps, getStepDeps (step_deps-based DAG scheduling)
- setStepStartedAt (wait step timer tracking)

Dead template features removed:
- debate_responses, chat_history, role context fields and resolvers
- StepOutput.outputs field (fan_out/map multi-output)
- serializeOutputs helper function

Dead API handlers removed:
- handleApproveStep, handleRejectStep (approval steps -> 410 Gone)
- handleGetChatTranscript (group_chat -> removed)

Dead validation rules removed:
- loop, sub_workflow, wait, router, saga, debate, group_chat step type rules
- All associated error variants and tests

CLAUDE.md rewritten to reflect current architecture:
- 7 step types (not 14)
- 7 reducer types
- 35+ API endpoints (was 19)
- 4 migrations (was 2)
- Full module map with all 27 source files
- Unified state model, checkpoints, SSE streaming docs
Engine used "schema" key to look up state schema from workflow JSON,
but the API and validation modules use "state_schema". This caused
reducers to silently fall back to last_value for all API-created runs.

Add getSchemaJson() helper that checks both "state_schema" (canonical)
and "schema" (fallback for inline test workflows).

Remove dead code:
- isNodeDeferred (unused, collectDeferredNodes used instead)
- getNodeFieldBool (only caller was isNodeDeferred)
- mergeWorkflowVersionIntoMeta (unused, serializeRouteResultsWithVersion handles this)
- InvalidNumber error set declaration (error.InvalidNumber works via implicit sets)
- _field_name unused parameter in checkStateRefs
Test count is 322 after dead code removal. The canonical schema key
in workflow definitions is "state_schema", not "schema".
Pending state injections consumed between agent continuation turns
were discarded (assigned to _ and never applied). This caused
injections submitted via POST /runs/{id}/state during multi-turn
agent execution to be silently lost.

Re-save consumed injections so they are properly applied after the
full agent node completes its multi-turn loop.
This handler was defined but never wired to any API route. The
approval/signal step concept was replaced by interrupt + resume.
The legacy approve/reject routes already return 410 Gone.
- getActiveRuns no longer queries for 'paused' status which doesn't
  exist as a valid RunStatus enum value. Only 'running' is queried.
- Worker registration error message now lists all 6 supported protocols
  (was missing mqtt, redis_stream, a2a).
No backward compatibility needed. These endpoints were deprecated
in favor of interrupt + resume pattern. Remove the routes, their
test, and the now-unused seg5 path segment.
The approval step test was a leftover from the removed approve/reject
endpoint. No backward compatibility needed.
DonPrus added 4 commits March 13, 2026 22:08
The body of `for (ready_nodes) |node_name|` was at the same indent
level as the `for` statement itself, making it look like the code
was outside the loop. Re-indent the ~420 lines to proper nesting.
Extract max_nodes_per_tick (1000) and max_subgraph_depth (10)
into module-level constants for clarity and single-point-of-change.
Use cmd.exe instead of /bin/sh on Windows. Skip shell-dependent
tests on Windows since hook commands are Unix shell syntax.
@DonPrus DonPrus changed the title Feat/orchestration Graph-based workflow orchestration engine Mar 14, 2026
DonPrus added 21 commits March 14, 2026 07:12
Replace std.Thread.sleep in retry loop with non-blocking retry
scheduling. Instead of blocking the engine thread, failed retry
attempts now create a step record with next_attempt_at_ms and
return control to the tick loop. Future ticks check the timestamp
and re-execute when the delay has elapsed.

Also fix stale checkpoint parent_id: latest_checkpoint was fetched
once at processRunWithDepth start, causing all checkpoints within
a tick to point to the same parent. Introduce latest_checkpoint_id
that updates after each checkpoint creation for correct chaining.
Remove the explicit ctx.allocator.free(sse_events) call which was
misleading since ctx.allocator is a per-request arena where free
is a no-op. Add comment explaining why inner strings (event_type,
data) must not be freed here as they originate from the engine's
per-tick arena, not the request arena.
validateConfig runs listWorkers + getActiveRuns every 200ms tick.
Cache the result with a timestamp, only re-validate every 30s.
On validation failure, immediately invalidate the cache so the
next tick re-checks.
executeSendNode was calling store.listWorkers and building
worker_infos for every item in the send array. Move both the
worker list fetch and worker_infos construction before the loop
since the worker list doesn't change between items.
Previously, run was created with 'pending' status then updated to
'running' in a separate operation. The engine could miss the run
between these two DB operations. Add createRunWithStateAndStatus
to store and use it in handleRunWorkflow to create the run directly
with 'running' status in a single INSERT.
When replaying from a checkpoint, steps and checkpoints created after
the replay point remained in the DB, causing stale data. Add
deleteStepsAfterTimestamp and deleteCheckpointsAfterVersion to the
store, and call both in handleReplayRun before resetting the run state.
workflow_json was parsed independently by ~15 helper calls per tick.
Parse it once at the top of processRunWithDepth and pre-extract the
state schema. Replace all getSchemaJson(alloc, workflow_json) calls
within the function with the cached value, eliminating ~9 redundant
JSON parses per node execution.
@DonPrus DonPrus merged commit 37392e0 into main Mar 14, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant